---
id: dmtpbase
title: Dmtp基础功能
---

### 定义

命名空间：TouchSocket.Dmtp <br/>
程序集：[TouchSocket.Dmtp.dll](https://www.nuget.org/packages/TouchSocket.Dmtp)

## 一、连接

连接验证可以初步保证连接客户端的安全性。框架内部默认使用一个string类型的Token作为验证凭证。当然也允许服务器进行其他验证。具体如下：

### 1.1 Token验证
在服务器或客户端的配置上，设置VerifyToken，即可实现字符串Token验证。

```csharp showLineNumbers
var config = new TouchSocketConfig()
    .SetDmtpOption(new DmtpOption()
    {
        VerifyToken = "Dmtp"
    });
```

### 1.2 动态验证

使用插件，实现**IDmtpHandshakingPlugin**插件。然后可以自行判断一些信息，比如：IP地址、元数据等。

设置元数据。

```csharp showLineNumbers
var config = new TouchSocketConfig()
    .SetDmtpOption(new DmtpOption()
    {
        VerifyToken = "Dmtp",
        Metadata=new Metadata().Add("a","a")
    });
```

```csharp showLineNumbers
internal class MyVerifyPlugin : PluginBase,IDmtpHandshakingPlugin
{

    public async Task OnDmtpHandshaking(IDmtpActorObject client, DmtpVerifyEventArgs e)
    {
        if (e.Metadata["a"] != "a")
        {
            e.IsPermitOperation = false;//不允许连接
            e.Message = "元数据不对";//同时返回消息
            e.Handled = true;//表示该消息已在此处处理。
            return;
        }
        if (e.Token == "Dmtp")
        {
            e.IsPermitOperation = true;
            e.Handled = true;
            return;
        }

        await e.InvokeNext();
    }
}
```

### 1.4 跨语言

为使Dmtp服务器支持跨语言，Dmtp在设计之初就预留了跨语言连接的便利性。诚如[Dmtp描述](./dmtpdescription.mdx)所示，其基础数据报文为`Flags+Length+Data`。而框架内部的Handshake、Ping、Pong、Close等指令均是采用Json数据格式。但是及时如此，连接时的真正数据，还与其基础协议有关。具体如下：

以连接、操作`TcpDmtpService`为例。其基础协议即为`tcp`，则使用常规的`tcp`客户端即可模拟链接。

```csharp showLineNumbers
using var tcpClient = new TcpClient();//创建一个普通的tcp客户端。
tcpClient.Received = (client, e) =>
{
    //此处接收服务器返回的消息

    var head = e.ByteBlock.ToArray(0,2);
    e.ByteBlock.Seek(2, SeekOrigin.Begin);
    var flags = e.ByteBlock.ReadUInt16(bigEndian: true);
    var length = e.ByteBlock.ReadInt32(bigEndian: true);

    var json = Encoding.UTF8.GetString(e.ByteBlock.Buffer, e.ByteBlock.Pos, e.ByteBlock.CanReadLen);

    ConsoleLogger.Default.Info($"收到响应：flags={flags}，length={length}，json={json}");
    return Task.CompletedTask;
};

//开始链接服务器
tcpClient.Connect("127.0.0.1:7789");

//以json的数据方式。
//其中Token、Metadata为连接的验证数据，分别为字符串、字符串字典类型。
//Id则表示指定的默认id，字符串类型。
//Sign为本次请求的序号，一般在连接时指定一个大于0的任意数字即可。
var json = @"{""Token"":""Dmtp"",""Metadata"":{""a"":""a""},""Id"":null,""Sign"":1}";

//将json转为utf-8编码。
var jsonBytes = Encoding.UTF8.GetBytes(json);

using (var byteBlock = new ByteBlock())
{
    //按照Head+Flags+Length+Data的格式。
    byteBlock.Write(Encoding.ASCII.GetBytes("dm"));
    byteBlock.Write(TouchSocketBitConverter.BigEndian.GetBytes((ushort)1));
    byteBlock.Write(TouchSocketBitConverter.BigEndian.GetBytes((int)jsonBytes.Length));
    byteBlock.Write(jsonBytes);

    tcpClient.SendAsync(byteBlock);
}
await Task.Delay(2000);
```

接收输出：

收到的Json字符串，会返回服务器最终修改的Token、Metadata。同时还包括分配或指定的Id。Sign会与请求时一致，表示这是同一组请求。Status等于1即为连接成功。其他值则可能在Message表明连接失败的原因。

```
收到响应：flags=2,length=124,json={"Token":"Dmtp","Metadata":{"a":"a"},"Id":"1","Message":null,"Sign":1,"Status":1}
```

其他Json请求，包括：

【请求连接】

- Token：连接令牌，字符串类型。
- Metadata：连接元数据，字典类型。
- Id：初始连接Id，字符串类型。
- Sign：请求序号，整数类型，应当每次请求递增。
- Status(仅返回时携带)：连接状态，整数类型，为1时成功，其他状态可以获取Message。
- Message(仅返回时携带)：连接消息。

```javascript
{"Token":"Dmtp","Metadata":{"a":"a"},"Id":null,"Sign":1}
```

【Ping】

- Sign：请求序号，整数类型，应当每次请求递增。
- Status(仅返回时携带)：连接状态，整数类型，为1时成功，其他状态可以获取Message。
- Message(仅返回时携带)：连接消息。
- Route：是否路由，布尔类型，当TargetId不为空时应设为True。
- SourceId：源Id，字符串类型，当需要Ping其他客户端时，应该将该值设为自身Id，以保证返回的信息能够顺利路由回来。
- TargetId：目标Id，字符串类型，当需要Ping其他客户端时，应该将该值设为目标Id，以保证信息能够顺利送达。

```javascript
{"Sign":0,"Status":0,"Message":null,"Route":false,"SourceId":null,"TargetId":null}
```

【Close】

Close报文直接采用utf8编码的字符串。
```
"close"
```

:::note 备注

DMTP基础功能是比较容易跨语言的，但这并不意味着DMTP所有的功能都支持。其扩展功能，则需要实际的跨平台设计。不过依靠简单的Head+Flags+Length+Data的格式，也能自己实现很多的功能。

:::  

## 二、Id同步

在Dmtp中，存在于服务器的辅助客户端（SessionClient），与远程客户端（Client）是一一对应关系，其Id也**完全一致**。所以在任意一方修改Id（调用ResetId），都会同时修改远程Id。所以合理使用该操作，可以完成复用Id（重置Id）的需求。


## 三、发送数据

Dmtp提供协议发送数据，又叫协议扩展功能，就是对现有的Dmtp进行自定义的扩展协议。

使用起来是非常简单的，每个DmtpActor，都实现了Send方法接口。

第一个参数为ushort类型，使用者可以**约定任意大于20数值**。

```csharp showLineNumbers
client.SendAsync(1000,Encoding.UTF8.GetBytes("RRQM"));
```

:::caution 注意

Flags不要使用小于20的，因为框架内部在使用。并且小于100的也最好不要使用，因为可能其他组件也在使用。

:::  

在**接收方**订阅`IDmtpReceivedPlugin`，已经包含了协议参数，所以直接自行筛选即可。

```csharp showLineNumbers
internal class MyFlagsPlugin : PluginBase, IDmtpReceivedPlugin
{
    public async Task OnDmtpReceived(IDmtpActorObject client, DmtpMessageEventArgs e)
    {
        if (e.DmtpMessage.ProtocolFlags == 1000)
        {
            //判断完协议以后，从 e.DmtpMessage.BodyByteBlock可以拿到实际的数据
            string msg = e.DmtpMessage.BodyByteBlock.ToString();

            return;
        }

        //flags不满足，调用下一个插件
        await e.InvokeNext();
    }
}
```

## 四、使用Channel发送数据

Channel是DmtpActor的一个传输通道。一般来说，DmtpActor是一个逻辑连接（例如：一个Tcp、Udp、NamedPipe等）。而Channel则是更加细分DmtpActor的一个传输通道。

换言之，一个DmtpActor可以有多个Channel。多个Channel可以同时进行传输。并且数据互不干扰。

### 4.1 特点

1. 每个Channel都有独立的Id，所以多个Channel可以同时工作。
2. 每个Channel在被创建后，都支持双向读写。
3. 每个Channel都支持自定义限速和读写超时设定。

### 4.2 使用步骤

1. 由一方DmtpActor创建一个Channel，并记住Id。
2. 另一方DmtpActor使用这个Id订阅Channel。默认情况下，Channel在被创建时，接收端会触发IDmtpCreateChannelPlugin插件。此时可以直接订阅。
3. 发送方使用Channel发送（Write）数据
4. 接收方通过订阅的Channel，接收（Read）数据

下列以TcpDmtpClient发送、TcpDmtpService接收为例：

【创建客户端与服务器】

```csharp showLineNumbers
private static TcpDmtpClient GetTcpDmtpClient()
{
    var client = new TouchSocketConfig()
           .SetRemoteIPHost("127.0.0.1:7789")
           .SetDmtpOption(new DmtpOption()
           {
               VerifyToken = "Channel"
           })
           .SetSendTimeout(0)
           .ConfigureContainer(a =>
           {
               a.AddConsoleLogger();
           })
           .ConfigurePlugins(a =>
           {
               a.Add<MyPlugin>();
           })
           .BuildWithTcpDmtpClient();

    client.Logger.Info("连接成功");
    return client;
}

private static TcpDmtpService GetTcpDmtpService()
{
    var service = new TcpDmtpService();

    var config = new TouchSocketConfig()//配置
           .SetListenIPHosts(7789)
           .SetSendTimeout(0)
           .ConfigureContainer(a =>
           {
               a.AddConsoleLogger();
           })
           .ConfigurePlugins(a =>
           {
               a.Add<MyPlugin>();
           })
           .SetDmtpOption(new DmtpOption()
           {
               VerifyToken = "Channel"//连接验证口令。
           });

    await service.SetupAsync(config);
    await service.StartAsync();
    service.Logger.Info("服务器成功启动");
    return service;
}
```

【客户端创建Channel并写入数据】

Channel支持持续写入，且无长度限制。但是需要注意的是，在写入结束时，应该调用**终止语句**（下文详解），以结束写入并通知接收端。

```csharp {9,19,23} showLineNumbers
static void Main(string[] args)
{
    var service = GetTcpDmtpService();
    var client = GetTcpDmtpClient();

    var count = 1024 * 1;//测试1Gb数据

    //1.创建通道，同时支持通道路由和元数据传递
    using (var channel = client.CreateChannel())
    {
        //设置限速
        //channel.MaxSpeed = 1024 * 1024;

        ConsoleLogger.Default.Info($"通道创建成功，即将写入{count}Mb数据");
        var bytes = new byte[1024 * 1024];
        for (var i = 0; i < count; i++)
        {
            //2.持续写入数据
            channel.Write(bytes);
        }

        //3.在写入完成后调用终止指令。例如：Complete、Cancel、HoldOn、Dispose等
        channel.Complete();
        ConsoleLogger.Default.Info("通道写入结束");
    }

    while (true)
    {
        BytePool.Default.Clear();
        GC.Collect();
        Console.ReadKey();
    }

}
```


【服务器使用插件订阅Channel】

插件的主要作用是处理DmtpCreateChannel事件。在插件中，我们需要订阅Channel，然后接收Channel的数据，最后执行相应的操作。

不过这并不是必须的，如果你有其他逻辑能处理**订阅**，那么也可以不使用插件。例如：[DmtpRpc大数据传输](./dmtprpc.mdx)示例中，就使用了Rpc将创建的Channel的Id传递到响应端，然后响应端再处理订阅和读写。

```csharp showLineNumbers
internal class MyPlugin : PluginBase, IDmtpCreateChannelPlugin
{
    private readonly ILog m_logger;

    public MyPlugin(ILog logger)
    {
        this.m_logger = logger;
    }
    public async Task OnCreateChannel(IDmtpActorObject client, CreateChannelEventArgs e)
    {
        if (client.TrySubscribeChannel(e.ChannelId, out var channel))
        {
            //接收端也可以限速
            //channel.MaxSpeed = 1024 * 1024;
            using (channel)
            {
                long count = 0;
                this.m_logger.Info("通道开始接收");
                foreach (var byteBlock in channel)
                {
                    //这里处理数据
                    count += byteBlock.Len;
                    this.m_logger.Info($"通道已接收：{count}字节");
                }

                this.m_logger.Info($"通道接收结束，状态={channel.Status}，共接收{count / (1048576.0):0.00}Mb字节");
            }
        }

        await e.InvokeNext();
    }
}
```

### 4.3 终止语句

Channel的终止语句总共有4个：

- Complete：完成，表示写入结束，接收端会收到完成的状态。
- Cancel：取消，表示写入被取消，接收端会收到取消的状态。
- HoldOn：等待继续，表示写入被中断，但是可以恢复传输，接收端会跳出接收，但是可以重新进入接收。
- Dispose：释放，表示写入被释放，接收端会收到释放的状态。

其中Complete、Cancel、Dispose均为一次性终止语句，即调用以后，Channel将不可再被使用。如果想再使用Channel，则必须重新创建，重新订阅。

HoldOn为可恢复的终止语句，调用以后，Channel会进入等待状态，接收端会跳出接收，但是可以重新进入接收。这在多段数据传输传输时是有用的。

例如：

下列示例中，我们使用HoldOn来模拟多段数据传输。每次发送10段1024字节的数据，然后调用HoldOn，循环100次。

```csharp {10-19} showLineNumbers
//1.创建通道，同时支持通道路由和元数据传递
using (var channel = client.CreateChannel())
{
    //设置限速
    //channel.MaxSpeed = 1024 * 1024;

    ConsoleLogger.Default.Info($"通道创建成功，即将写入");
    var bytes = new byte[1024];

    for (var i = 0; i < 100; i++)//循环100次
    {
        for (int j = 0; j < 10; j++)
        {
            //2.持续写入数据
            channel.Write(bytes);
        }
        //3.在某个阶段完成数据传输时，可以调用HoldOn
        channel.HoldOn();
    }
    //4.在写入完成后调用终止指令。例如：Complete、Cancel、HoldOn、Dispose等
    channel.Complete();

    ConsoleLogger.Default.Info("通道写入结束");
}
```

这样接收端会收到100次10段1024字节的数据。

```csharp {16-27} showLineNumbers
internal class MyPlugin : PluginBase, IDmtpCreateChannelPlugin
{
    ...

    public async Task OnCreateChannel(IDmtpActorObject client, CreateChannelEventArgs e)
    {
        if (client.TrySubscribeChannel(e.ChannelId, out var channel))
        {
            //接收端也可以限速
            //channel.MaxSpeed = 1024 * 1024;
            using (channel)
            {
                this.m_logger.Info("通道开始接收");

                //此判断主要是探测是否有Hold操作
                while (channel.CanMoveNext)
                {
                    long count = 0;
                    foreach (var byteBlock in channel)
                    {
                        //这里处理数据
                        count += byteBlock.Len;
                        this.m_logger.Info($"通道已接收：{count}字节");
                    }

                    this.m_logger.Info($"通道接收结束，状态={channel.Status}，共接收{count / (1048576.0):0.00}Mb字节");
                }
                
            }
        }

        await e.InvokeNext();
    }
}
```

### 4.3 终止语句短语

我们在调用终止指令时，同时可以传递一个字符串短语。

例如：

```csharp showLineNumbers
channel.Complete("我完成了");
```

### 4.4 传输限速

传输限速，可以限制Channel的传输速度。

限速操作不仅可以在发送端设置，还可以在接收端设置。但是一般建议在发送端设置。因为在接收端设置的话，可能会有一些不稳定因素。

```csharp showLineNumbers
//设置限速
channel.MaxSpeed = 1024 * 1024;
```

### 4.5 传输超时

Channel是基于写入和读取的异步机制。如果写入方迟迟没有数据写入，那读取方可能在一段时间的等待后超时。

或者如果读取方迟迟没有数据读取，那写入方在写入一部分数据后，会发生拥塞，如果时间较长，也会发生等待超时。

当然我们可以设置超时时间：

```csharp showLineNumbers
//设定读取超时时间
channel.Timeout = TimeSpan.FromSeconds(30);
```

### 4.6 注意事项

1. Channel的传输是有序的。
2. Channel支持所有的Dmtp组件，但要求是该组件必须基于可靠协议，例如：UdpDmtp则不支持。
3. Channel拥有自己的拥塞控制，当发生拥塞的时候，不会阻塞底层协议。但是应该也要避免发生拥塞。
4. Channel在使用完成后，必须**显示释放**，所以建议使用using关键字。

[Channel示例Demo](https://gitee.com/RRQM_Home/TouchSocket/tree/master/examples/Dmtp/DmtpChannelConsoleApp)

## 五、使用心跳

Dmtp组件自带了`Ping`（或者`PingAsync`）方法。这个方法强制要求被`Ping`的一方无条件响应的。

所以，如果`Ping`返回`true`，则说明对方**必在线**。但是如果返回`false`，则**不一定代表对方不在线**。因为有可能是`Ping`超时，或者当前传输链路正在忙。

总之，使用者可以在业务中调用这个方法，来检测通讯是否通畅。

同时，库中基于此方法，封装了一个可用心跳插件。

其中可以配置**心跳间隔**和**最大失败次数**。

```csharp {3} showLineNumbers
.ConfigurePlugins(a =>
{
    a.UseDmtpHeartbeat()
    .SetTick(TimeSpan.FromSeconds(3))
    .SetMaxFailCount(3);
})
```

:::tip 提示

`Ping`可以由客户端或服务器任意一端发起，然后要求对方回应。但是一般我们建议由客户端发起。由服务器响应即可。如果有特殊要求，则自行解决即可。

:::  

## 六、使用断线重连

```csharp showLineNumbers
.ConfigurePlugins(a =>
{
    //使用重连
    a.UseReconnection<TcpDmtpClient>()
    .UsePolling()//使用轮询
    .SetActionForCheck(async (c, i) =>//重新定义检活策略
    {
        //方法1，直接判断是否在握手状态。使用该方式，最好和心跳插件配合使用。因为如果直接断网，则检测不出来
        //await Task.CompletedTask;//消除Task
        //return c.IsHandshaked;//判断是否在握手状态

        //方法2，直接ping，如果true，则客户端必在线。如果false，则客户端不一定不在线，原因是可能当前传输正在忙
        if (await c.PingAsync())
        {
            return true;
        }
        //返回false时可以判断，如果最近活动时间不超过3秒，则猜测客户端确实在忙，所以跳过本次重连
        else if (DateTime.Now - c.GetLastActiveTime() < TimeSpan.FromSeconds(3))
        {
            return null;
        }
        //否则，直接重连。
        else
        {
            return false;
        }
    })
    .SetTick(TimeSpan.FromSeconds(3));//每3秒检测一次
})
```